 1.大数据高速计算引擎Spark（上）之Spark Core中RDD编程高阶下的RDD分区
   
   spark.default.parallelism：（默认的并发数）= 2
   当配置文件spark-default.conf中没有显示的配置，则按照如下规则取值：
   1).本地模式
   spark-shell --master local[N]  spark.default.parallelism = N
   spark-shell --master local    spark.default.parallelism = 1
   2).伪分布式(x为本机上启动的executor数，y为每个executor使用的core数，z
为每个 executor使用的内存)
   spark-shell --master local-cluster[x,y,z]
   spark.default.parallelism = x * y
   3).分布式模式（yarn & standalone）
   spark.default.parallelism = max(应用程序持有executor的core总数, 2) 
   备注：total number of cores on all executor nodes or 2, whichever is larger
   
   经过上面的规则，就能确定了spark.default.parallelism的默认值(配置文件spark-
default.conf中没有显示的配置。如果配置了,则spark.default.parallelism = 配置的值)
   
   SparkContext初始化时，同时会生成两个参数，由上面得到的
   spark.default.parallelism推导出这两个参数的值
// 从集合中创建RDD的分区数
sc.defaultParallelism  = spark.default.parallelism

// 从文件中创建RDD的分区数
sc.defaultMinPartitions = min(spark.default.parallelism, 2)
   
   以上参数确定后，就可以计算 RDD 的分区数了。
def defaultMinPartitions: Int =Math.min(defaultParallelism, 2)
   
   创建 RDD 的几种方式：
   1).通过集合创建
// 如果创建RDD时没有指定分区数，则rdd的分区数 = sc.defaultParallelism
val rdd = sc.parallelize(1 to 100)
rdd.getNumPartitions
   备注：简单的说RDD分区数等于cores总数
   2).通过textFile创建
   val rdd = sc.textFile("data/start0721.big.log")
   rdd.getNumPartitions
   如果没有指定分区数：
      本地文件。rdd的分区数 = max(本地文件分片数, sc.defaultMinPartitions)
      HDFS文件。 rdd的分区数 = max(hdfs文件 block 数, sc.defaultMinPartitions)
   
   备注：
        本地文件分片数 = 本地文件大小 / 32M
		如果读取的是HDFS文件，同时指定的分区数 < hdfs文件的block数，指定的数不
生效。

 2.RDD分区器
   
   以下RDD分别是否有分区器，是什么类型的分区器
val rdd1 = sc.textFile("/wcinput/wc.txt")
rdd1.partitioner

val rdd2 = rdd1.flatMap(_.split("\\s+"))
rdd2.partitioner

val rdd3 = rdd2.map((_, 1))
rdd3.partitioner

val rdd4 = rdd3.reduceByKey(_+_)
rdd4.partitioner

val rdd5 = rdd4.sortByKey()
rdd5.partitioner

   Optionally, a Partitioner for key-value RDDs (e.g. to say that
the RDD is hash-partitioned)
   只有Key-Value类型的RDD才可能有分区器，Value类型的RDD分区器的值是
None。
   分区器的作用及分类：
   在 PairRDD(key,value) 中，很多操作都是基于key的，系统会按照key对数据进行重
组，如groupbykey；
   数据重组需要规则，最常见的就是基于 Hash 的分区，此外还有一种复杂的基于抽样
Range分区方法；
   
   HashPartitioner：最简单、最常用，也是默认提供的分区器。对于给定的key，计算
其hashCode，并除以分区的个数取余，如果余数小于0，则用余数+分区的个数,最后返回
的值就是这个key所属的分区ID.该分区方法可以保证key相同的数据出现在同一个分区中。 
   用户可通过partitionBy主动使用分区器，通过partitions参数指定想要分区的数量。
val rdd1 = sc.makeRDD(1 to 100).map((_, 1))
rdd1.getNumPartitions

// 仅仅是将数据大致平均分成了若干份；rdd并没有分区器
rdd1.glom.collect.foreach(x=>println(x.toBuffer))
rdd1.partitioner

// 主动使用 HashPartitioner
val rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(10))
rdd2.glom.collect.foreach(x=>println(x.toBuffer))

// 主动使用 HashPartitioner
val rdd3 = rdd1.partitionBy(new org.apache.spark.RangePartitioner(10, rdd1))
rdd3.glom.collect.foreach(x=>println(x.toBuffer))
   
   Spark的很多算子都可以设置 HashPartitioner 的值：
/**
   * Merge the values for each key using an associative and commutative reduce function. This will
   * also perform the merging locally on each mapper before sending results to a reducer, similarly
   * to a "combiner" in MapReduce.
   */
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }

  /**
   * Merge the values for each key using an associative and commutative reduce function. This will
   * also perform the merging locally on each mapper before sending results to a reducer, similarly
   * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
   */
  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
    reduceByKey(new HashPartitioner(numPartitions), func)
  }
   RangePartitioner：简单的说就是将一定范围内的数映射到某一个分区内。在实现
中,分界的算法尤为重要,用到了水塘抽样算法。sortByKey会使用RangePartitioner。
   现在的问题：在执行分区之前其实并不知道数据的分布情况，如果想知道数据分区就
需要对数据进行采样；
   Spark中RangePartitioner在对数据采样的过程中使用了水塘采样算法。
   水塘采样：从包含n个项目的集合S中选取k个样本，其中n为一很大或未知的数量，
尤其适用于不能把所有n个项目都存放到主内存的情况；
   在采样的过程中执行了collect()操作，引发了Action操作。
   自定义分区器：Spark允许用户通过自定义的Partitioner对象，灵活的来控制RDD的
分区方式。
   实现自定义分区器按以下规则分区：
       分区0 < 100
       100 <= 分区1 < 200
       200 <= 分区2 < 300
       300 <= 分区3 < 400
       ... ...
       900 <= 分区9 < 1000

package cn.lagou.sparkcore

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

import scala.collection.immutable

class MyPratitioner(n : Int) extends Partitioner{
  // 有多少个分区数
  override def numPartitions: Int = n

  // 给定key，如果去分区
  override def getPartition(key: Any): Int = {
    val k = key.toString.toInt
    k / 100
  }
}

object UserDefinedPartitioner {
  def main(args: Array[String]): Unit = {
    // 创建SparkContext
    val conf =
      new SparkConf().setAppName(
        this.getClass.getCanonicalName.init).setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    // 业务逻辑
    val random = scala.util.Random
    val arr: immutable.IndexedSeq[Int] = (1 to 100).map(idx => random.nextInt(1000))
    val rdd1: RDD[(Int, Int)] = sc.makeRDD(arr).map((_, 1))
    rdd1.glom.collect.foreach(x => println(x.toBuffer))

    println("***************************************************************")
    val rdd2 = rdd1.partitionBy(new MyPratitioner(11))
    rdd2.glom.collect.foreach(x => println(x.toBuffer))

    // 关闭SparkContext
    sc.stop()
  }
}

